package main

import (
	"bytes"
	"encoding/binary"
	"fmt"
	"github.com/gomodule/redigo/redis"
	"go.uber.org/zap/buffer"
	"src/utils"
	"sync"
	"time"
	"unsafe"
)

type SubscribeCallback func(channel, message string)

type Subscriber struct {
	client redis.PubSubConn
	cbMap  map[string]SubscribeCallback
}

func Publish() {
	client, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialDatabase(0), redis.DialPassword("123456"))
	if err != nil {
		utils.Log.Info("subscribe dial failed.")
	}

	if err != nil {
		utils.Log.Info("subscribe dial failed.")
	}
	defer client.Close()

	_, err = client.Do("Publish", "test_chan1", "hello")
	if err != nil {
		utils.Log.Info("subscribe Publish failed.")
	}

	_, err = client.Do("Publish", "test_chan2", "hello")
	if err != nil {
		utils.Log.Info("subscribe Publish failed.")
	}

	_, err = client.Do("Publish", "test_chan3", "hello")
	if err != nil {
		utils.Log.Info("subscribe Publish failed.")
	}

}
func init() {

}
func (c *Subscriber) conn(ip string, port string) *Subscriber {

	conn, err := redis.Dial("tcp", ip+":"+port, redis.DialDatabase(0), redis.DialPassword("123456"))
	if err != nil {
		utils.Log.Info("subscribe dial failed.")
	}
	c.client = redis.PubSubConn{conn}
	c.cbMap = make(map[string]SubscribeCallback)
	return c
}

// 订阅主题
func (c *Subscriber) Connect(ip string, port string) {
	var i int
	for {
		var pool = newPool()
		i++
		ci := pool.Get()
		fmt.Println(ci)
		defer ci.Close()
		utils.Log.Info("wait...")
		c.client = redis.PubSubConn{ci}
		c.cbMap = make(map[string]SubscribeCallback)
		switch res := c.client.Receive().(type) {
		case redis.Message:
			channel := (*string)(unsafe.Pointer(&res.Channel))
			message := (*string)(unsafe.Pointer(&res.Data))
			c.cbMap[*channel](*channel, *message)
		case redis.Subscription:
			fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
		case error:
			// 必须要先声明defer，否则不能捕获到panic异常
			fmt.Println("断网 服务器挂掉")
			fmt.Println(res.Error()) // 这里的err其实就是panic传入的内容，55
			time.Sleep(2 * time.Second)
			utils.Log.Error("error handle...")
			panic(res.Error())
		}
	}
}

func (c *Subscriber) Close() {
	err := c.client.Close()
	if err != nil {
		utils.Log.Error("subscribe close error.")
	}
}

func (c *Subscriber) Subscribe(channel interface{}, cb SubscribeCallback) {
	err := c.client.Subscribe(channel)
	if err != nil {
		utils.Log.Error("subscribe Subscribe error.")
	}

	c.cbMap[channel.(string)] = cb
}

func TestCallback1(chann, msg string) {
	utils.Log.Info("TestCallback1 channel : ", chann, " message : ", msg)
}

func TestCallback3(chann, msg string) {
	utils.Log.Info("TestCallback3 channel : ", chann, " message : ", msg)
}
func run() {
	defer func() {
		fmt.Println("fffffffffffffffffff")
		time.Sleep(2 * time.Second)
	}()
	//sub.cbMap["chat"] = TestCallback1

	start := time.Now()
	wg := sync.WaitGroup{}
	// 增加阻塞计数器
	wg.Add(1)

	//for {

	go func() {
		defer func() {
			fmt.Println("触发回调")
			fmt.Println("999999999999999999999999")
			if err := recover(); err != nil {

				fmt.Println(err) // 这里的err其实就是panic传入的内容，55
				time.Sleep(2 * time.Second)
			}
			fmt.Println("1111111111111111111111111111")
			// 扣减阻塞计数器
			run()
			wg.Done()
		}()

		var sub Subscriber
		sub.Connect("127.0.0.1", "6379")
	}()
	// 等待阻塞计数器到 0
	wg.Wait()
	d := time.Since(start)
	fmt.Println("使用WaitGroup阻塞了：", d)

	//}
}
func newPool() *redis.Pool {
	return &redis.Pool{
		MaxIdle:   50,
		MaxActive: 50, // max number of connections
		Wait:      true,
		Dial: func() (redis.Conn, error) {
			var c redis.Conn
			var err error
			//for err != nil {
			c, err = redis.Dial("tcp", ":6379", redis.DialDatabase(1), redis.DialPassword("123456"))
			if err != nil {
				fmt.Println("-----------------------链接失败")
				panic(err.Error())
			}
			//}
			return c, err
		},
	}

}

var cbk int
var one int64

func TestCallback2(chann string, msg string) {
	//log.Println("testcallback ----", string(msg))
	cbk++
	buf := bytes.NewBufferString(msg)
	//bf.ReadBytes('\n')
	btime, err := buf.ReadBytes('\n')
	if err != nil {
		fmt.Println(err)
		return
	}
	stime := int64(binary.BigEndian.Uint64(btime))
	//id, err := buf.ReadBytes('\n')
	sid := int64(binary.BigEndian.Uint64(buf.Bytes()))
	//stime := string(btime)
	//sid := string(id)
	//stime := utils.BytesToInt64(btime)
	//sid := utils.BytesToInt64(id)
	if sid%50000 == 0 || cbk%50000 == 0 {
		if one == 0 {
			one = stime
		}
		x := time.Now().Unix() - one
		utils.Log.Info("channel : ", chann, " start time : ", stime, ";时长：", x, ";id:", sid, ";index", cbk, ";one:", one)
	}
}

func run1() {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println(err) // 这里的err其实就是panic传入的内容，挂掉后1秒后 重新执行自己
			time.Sleep(1 * time.Second)
			run1()
		}
	}()
	var i int
	var pool = newPool()

	con := pool.Get()

	fmt.Println(con)
	defer con.Close()
	c := redis.PubSubConn{con}
	c.Subscribe("chat")
	for {
		i++
		receive := c.Receive()
		if receive == nil {
			panic("receive err.Error()")
			//continue fe
		}
		switch res := receive.(type) {
		case redis.Message:
			channel := (*string)(unsafe.Pointer(&res.Channel))
			message := (*string)(unsafe.Pointer(&res.Data))
			//fmt.Println("--------------------")
			go TestCallback2(*channel, *message)
		case redis.Subscription:
			fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
		case error:
			// 必须要先声明defer，否则不能捕获到panic异常
			fmt.Println("断网 服务器挂掉")
			panic(res.Error())
		}
	}

}
func main() {
	utils.Log.Info("===========main start============")
	//接收订阅消息
	go run1()
	//发布订阅消息
	time.Sleep(1 * time.Second)

	pus()

}

// 生成唯一id 方便记录跟踪
var idx int64

func pus() {
	defer func() {
		fmt.Println("pus defer")
		time.Sleep(1 * time.Second)
		if err := recover(); err != nil {
			fmt.Println(err) // 这里的err其实就是panic传入的内容，55
		}
		pus()
	}()

	//var i int64
	var pool = newPool()
	c := pool.Get()
	defer c.Close()
	start := time.Now().Unix()
	fmt.Println(start)

	for {
		idx++
		msg := buffer.Buffer{}
		//buf := make([]byte, 8)
		//binary.BigEndian.PutUint64(buf, uint64(start))
		////写成函数调用总报错 不知道为啥
		buf := utils.Int64ToBytes(start)
		msg.Write(buf)
		//msg.WriteString("aaaaaaaaaaaaaaa")
		msg.WriteByte('\n')
		//buf2 := make([]byte, 8)
		//binary.BigEndian.PutUint64(buf2, uint64(idx))
		buf2 := utils.Int64ToBytes(idx)
		msg.Write(buf2)
		//推送string 和bytes 性能几乎是一样的
		data := string(msg.Bytes())
		//用send 比do快5倍左右，实测到10秒发送92.5w，平均每秒9.2万，丢失2418条,do方法推送10秒16w条左右，丢失2条。
		// 建议 发布方也加个订阅消息，接收方拿到消息回复一个状态，发送方定时轮训已发消息的状态，一定时间内没接收到，重新发一次。
		err := c.Send("Publish", "chat", data)
		//_, err := c.Do("Publish", "chat", data)
		if err != nil {
			idx--
			panic(err)
			//return "", err
		} else {
			//send 方法 需要使用  c.Flush() 推送
			err2 := c.Flush()
			if err2 != nil {
				idx--
				panic(err2)
			}
		}
		if idx%10000 == 0 {
			//time.Sleep(1 * time.Second)
		}
		//fmt.Println(idx)

		//time.Sleep(1 * time.Millisecond)
		//return result, nil
	}
}
